JBoss Community Archive (Read Only)

Infinispan 5.1

Introduction

MapReduce is a programming model and a framework for processing and generating large data sets. Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate k. MapReduce framework enables users to transparently parallelize their tasks and execute them on a large cluster of machines.

Scope and objectives

Infinispan's distributed execution framework focuses on developing a simple distributed execution framework for tasks that are already defined as Callables as well as an adapted version of Google's MapReduce framework for large computation tasks. The framework is defined as "adapted" because the input data for map reduce tasks is taken from Infinispan nodes themselves rather than using input files as it was defined by the original proposal.

Cache data as task input data

Infinispan's distributed task execution and MapReduce frameworks use data from Infinispan nodes as input for execution tasks. Most other distributed frameworks do not have that leverage and users have to specify input for distributed tasks from some well known location. Furthermore, users of Infinispan distributed execution framework do not have to configure store for intermediate and final results thus removing another layer of complexity and maintenance.

Load balanced execution - built in by default

Our distributed execution framework capitalizes on the fact input data in Infinispan data grid is already load balanced (DIST Infinispan mode). Since input data is already balanced execution tasks will be automatically balanced as well; users do not have to explicitly assign work tasks to specific Infinispan nodes. However, our framework accommodates users to specify arbitrary subset of cache data as input for distributed execution tasks.

Distributed tasks input

After an execution task is assigned for execution to a specific Infinispan node our framework provides API to access local data on Infinispan node used as an input to a distributed execution task. Access to Infinispan cache runtime environment is provided by setEnvironment callback of DistributedCallable. Users can access cache whose data is used as an input for distributed task or any other cache using a familiar CacheManager API.

Distributed task failover and migration

Distributed execution framework will support task failover. There are two orthogonal issues related to task failover:

a) Failover due to node failure where task is executing

b) Failover due to task failure (e.g. Callable task throws Exception).

Distributed tasks, taking input data from Infinispan nodes themselves, can rely on Infinispan's consistent hashing (CH) for failover of uncompleted tasks. CH based failover will migrate failed task T to cluster node(s) having a backup of input data that used to belong to failed node F. Task T executing on node F has to be re-spawned/migrated to node F' which is next to node F based on a hash wheel position. Task migration is continued until a running node - hash wheel neighbour of F is found. In another words, task migration continues until all hash wheel neghbours of F are exhausted or task completes successfully. Utilizing CH each newly spawned task migrated to node F' will be able to locate data that used to belong to failed node F. Implementation of a default node failover is planed for first release - time permitting. For distributed tasks that do not rely on pulling data from Infinispan nodes we can provide other policies: fail-fast, fail-slow etc etc.

Both node failover and task failover policy will be pluggable. Initial implementation will define interfaces to implement various node failover policies but we will provide only a simple policy that throws an exception if a node fails. In terms of task failure the default initial implementation will simply re-spawn the failed task until it reaches some failure threshold. Future implementations might migrate such a failing task to another node etc.

Distributed execution model

The main interfaces for distributed task execution are DistributedCallable and DistributedExecutorService. DistributedCallable is a subtype of the existing Callable from java.util.concurrent package; DistributedCallable can be executed in a remote JVM and receive input from Infinispan cache. Task's main algorithm could essentially remain unchanged, only the input source is changed. Exisiting Callable implementations most likely get its input in a form of some Java object/primitive while DistributedCallable gets its input from Infinispan cache. Therefore, users who have already implemented Callable interface to describe their task units would simply extend DistributedCallable and use keys from Infinispan execution environment as input for the task. Implentation of DistributedCallable can in fact continue to support implementation of an already existing Callable while simultaneously be ready for distribited execution by extending DistributedCallable.

public interface DistributedCallable<K, V, T> extends Callable<T> {

   /**
    * Invoked by execution environment after DistributedCallable
    * has been migrated for execution to a specific Infinispan node.
    *
    * @param cache
    *           cache whose keys are used as input data for this
    *           DistributedCallable task
    * @param inputKeys
    *           keys used as input for this DistributedCallable task
    */
   public void setEnvironment(Cache<K, V> cache, Set<K> inputKeys);

}

DistributedExecutorService is a simple extension of a familiar ExecutorService from java.util.concurrent package. However, advantages of DistributedExecutorService are not to be overlooked. Existing Callable tasks, instead of being executed in JDK's ExecutorService, are also eligible for execution on Infinispan cluster. Infinispan execution environment would migrate a task to execution node(s), run the task and return the result(s) to the calling node. Of course, not all Callable tasks would benefit from parallel distributed execution. Excellent candidates are long running and computationally intensive tasks that can run concurrently and/or tasks using input data that can be processed concurrently. For more details about good candidates for parallel execution and parallel algorithms in general refer to Introduction to Parallel Computing.

The second advantage of the DistributedExecutorService is that it allows a quick and simple implementation of tasks that take input from Infinispan cache nodes, execute certain computation and return results to the caller. Users would specify which keys to use as input for specified DistributedCallable and submit that callable for execution on Infinispan cluster. Infinispan runtime would locate the appriate keys, migrate DistributedCallable to target execution node(s) and finally return a list of results for each executed Callable. Of course, users can omit specifying input keys in which case Infinispan would execute DistributedCallable on all keys for a specified cache.

MapReduce model

Infinispan's own MapReduce model is an adaptation of Google's original MapReduce. There are four main components in each map reduce task: Mapper, Reducer, Collator and MapReduceTask.

Implementation of a Mapper class is a component of a MapReduceTask invoked once for each input entry K,V. Every Mapper instance migrated to an Infinispan node, given a cache entry K,V input pair transforms that input pair into intermediate key/value pair emitted into a provided Collector. Intermediate results are further reduced using a Reducer.

public interface Mapper<KIn, VIn, KOut, VOut> extends Serializable {

   /**
    * Invoked once for each input cache entry KIn,VOut .
    */
   void map(KIn key, VIn value, Collector<KOut, VOut> collector);
}

The Reducer, as its name implies, reduces a list of intermediate results from map phase of MapReduceTask. Infinispan distributed execution environment creates one instance of Reducer per execution node.

public interface Reducer<KOut, VOut> extends Serializable {
   /**
    * Combines/reduces all intermediate values for a particular
    * intermediate key to a single value.
    * <p>
    *
    */
   VOut reduce(KOut reducedKey, Iterator<VOut> iter);

}

Collator coordinates results from Reducers executed on Infinispan cluster and assembles a final result returned to an invoker of MapReduceTask. Collator is applied to final Map<KOut,VOut> result of MapReduceTask.

public interface Collator<KOut, VOut, R> {
   /**
    * Collates all reduced results and returns R to invoker
    * of distributed task.
    *
    * @return final result of distributed task computation
    */
   R collate(Map<KOut, VOut> reducedResults);
}

Finally, MapReduceTask is a distributed task uniting Mapper, Reducer and Collator into a cohesive large scale computation to be transparently parallelized across Infinispan cluster nodes. Users of MapReduceTask need to provide a cache whose data is used as input for this task. Infinispan execution environment will instantiate and migrate instances of provided mappers and reducers seamlessly across Infinispan nodes. Unless otherwise specified using onKeys method input keys filter all available key value pairs of a specified cache will be used as input data for this task.

Examples

Pi approximation can greatly benefit from parallel distributed execution in DistributedExecutorService. Recall that area of the square is Sa = 4r2 and area of the circle is Ca=pi*r2. Substituting r2 from the second equation into the first one it turns out that pi = 4 * Ca/Sa. Now, image that we can shoot very large number of darts into a square; if we take ratio of darts that land inside a circle over a total number of darts shot we will approximate Ca/Sa value. Since we know that pi = 4 * Ca/Sa we can easily derive approximate value of pi. The more darts we shoot the better approximation we get. In the example below we shoot 10 million darts but instead of "shooting" them serially we parallelize work of dart shooting across entire Infinispan cluster.

  public class PiAppx {

   public static void main (String [] arg){
      List<Cache> caches = ...;
      Cache cache = ...;

      int numPoints = 10000000;
      int numServers = caches.size();
      int numberPerWorker = numPoints / numServers;

      DistributedExecutorService des = new DefaultExecutorService(cache);
      long start = System.currentTimeMillis();
      CircleTest ct = new CircleTest(numberPerWorker);
      List<Future<Integer>> results = des.submitEverywhere(ct);
      int countCircle = 0;
      for (Future<Integer> f : results) {
         countCircle += f.get();
      }
      double appxPi = 4.0 * countCircle / numPoints;

      System.out.println("Distributed PI appx is " + appxPi +
      " completed in " + (System.currentTimeMillis() - start) + " ms");
   }

   private static class CircleTest implements Callable<Integer>, Serializable {

      /** The serialVersionUID */
      private static final long serialVersionUID = 3496135215525904755L;

      private final int loopCount;

      public CircleTest(int loopCount) {
         this.loopCount = loopCount;
      }

      @Override
      public Integer call() throws Exception {
         int insideCircleCount = 0;
         for (int i = 0; i < loopCount; i++) {
            double x = Math.random();
            double y = Math.random();
            if (insideCircle(x, y))
               insideCircleCount++;
         }
         return insideCircleCount;
      }

      private boolean insideCircle(double x, double y) {
         return (Math.pow(x - 0.5, 2) + Math.pow(y - 0.5, 2))
         <= Math.pow(0.5, 2);
      }
   }
}

Word count is a classic, if not overused, example of map/reduce paradigm. Assume we have a mapping of key-->sentence stored on Infinispan nodes. Key is a String, each sentence is also a String, and we have to count occurrence of all words in all sentences available. The implementation of such a distributed task could be defined as follows:

public class WordCountExample {

   /**
    * In this example replace c1 and c2 with
    * real Cache references
    *
    * @param args
    */
   public static void main(String[] args) {
      Cache c1 = null;
      Cache c2 = null;

      c1.put("1", "Hello world here I am");
      c2.put("2", "Infinispan rules the world");
      c1.put("3", "JUDCon is in Boston");
      c2.put("4", "JBoss World is in Boston as well");
      c1.put("12","JBoss Application Server");
      c2.put("15", "Hello world");
      c1.put("14", "Infinispan community");
      c2.put("15", "Hello world");

      c1.put("111", "Infinispan open source");
      c2.put("112", "Boston is close to Toronto");
      c1.put("113", "Toronto is a capital of Ontario");
      c2.put("114", "JUDCon is cool");
      c1.put("211", "JBoss World is awesome");
      c2.put("212", "JBoss rules");
      c1.put("213", "JBoss division of RedHat ");
      c2.put("214", "RedHat community");

      MapReduceTask<String, String, String, Integer> t =
         new MapReduceTask<String, String, String, Integer>(c1);
      t.mappedWith(new WordCountMapper())
         .reducedWith(new WordCountReducer());
      Map<String, Integer> wordCountMap = t.execute();
   }

   static class WordCountMapper implements Mapper<String,String,String,Integer> {
      /** The serialVersionUID */
      private static final long serialVersionUID = -5943370243108735560L;

      @Override
      public void map(String key, String value, Collector<String, Integer> c) {
         StringTokenizer tokens = new StringTokenizer(value);
         while (tokens.hasMoreElements()) {
            String s = (String) tokens.nextElement();
            c.emit(s, 1);
         }        
      }
   }

   static class WordCountReducer implements Reducer<String, Integer> {
      /** The serialVersionUID */
      private static final long serialVersionUID = 1901016598354633256L;

      @Override
      public Integer reduce(String key, Iterator<Integer> iter) {
         int sum = 0;
         while (iter.hasNext()) {
            Integer i = (Integer) iter.next();
            sum += i;
         }
         return sum;
      }
   }
}

As we have seen it is relatively easy to specify map reduce task counting number of occurrences for each word in all sentences. Best of all result is returned to task invoker in the form of Map<KOut,VOut> rather than being written to a stream.

What if we need to find the most frequent word in our word count example? All we have to do is to define a Collator that will transform the result of MapReduceTask Map<KOut,VOut> into a String which in turn is returned to a task invoker. We can think of Collator as transformation function applied to a final result of MapReduceTask.

MapReduceTask<String, String, String, Integer> t =
      new MapReduceTask<String, String, String, Integer>(cache);
t.mappedWith(new WordCountMapper()).reducedWith(new WordCountReducer());
String mostFrequentWord = t.execute(
      new Collator<String,Integer,String>() {

         @Override
         public String collate(Map<String, Integer> reducedResults) {
            String mostFrequent = "";
            int maxCount = 0;
            for (Entry<String, Integer> e : reducedResults.entrySet()) {
               Integer count = e.getValue();
               if(count > maxCount) {
                  maxCount = count;
                  mostFrequent = e.getKey();
               }             
            }
         return mostFrequent;
         }        

      });
System.out.println("The most frequent word is " + mostFrequentWord);
JBoss.org Content Archive (Read Only), exported from JBoss Community Documentation Editor at 2020-03-11 09:16:41 UTC, last content change 2011-07-11 20:37:32 UTC.